package com.atguigu.gmall.realtime.utils;

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;

/**
 * @author Felix
 * @date 2023/7/28
 * 操作habse的工具类
 */
public class HbaseUtil {
    //获取连接
    public static Connection getHbaseConnection() {
        Configuration conf = new Configuration();
        // 配置 Zookeeper
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");

        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            return connection;
        } catch (IOException e) {
            throw new RuntimeException();
        }
    }

    //关闭连接
    public static void closeHbaseConnection(Connection conn) {
        if (conn != null && !conn.isClosed()) {
            try {
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    //建表
    public static void createTable(Connection conn, String namespace, String tableName, String... families) {
        if (families.length < 1) {
            System.out.println("建表至少需要一个列族");
            return;
        }

        try (Admin admin = conn.getAdmin()) {
            TableName tableNameObj = TableName.valueOf(namespace, tableName);
            if (admin.tableExists(tableNameObj)) {
                System.out.println("要创建" + namespace + ":" + tableName + "的表已经存在");
                return;
            }
            System.out.println("在hbase中创建" + namespace + ":" + tableName + "表");
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableNameObj);
            for (int i = 0; i < families.length; i++) {
                String family = families[i];
                ColumnFamilyDescriptor columnFamilyDescriptor
                    = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();
                tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
            }
            admin.createTable(tableDescriptorBuilder.build());
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //删表
    public static void dropTable(Connection conn, String namespace, String tableName) {
        try (Admin admin = conn.getAdmin()) {
            TableName tableNameObj = TableName.valueOf(namespace, tableName);
            if (!admin.tableExists(tableNameObj)) {
                System.out.println("要删除的" + namespace + ":" + tableName + "表不存在");
                return;
            }
            System.out.println("删除" + namespace + ":" + tableName + "表");
            admin.disableTable(tableNameObj);
            admin.deleteTable(tableNameObj);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //从hbase表中删除一行数据
    public static void delRow(Connection conn, String namespace, String tableName, String rowkey) throws IOException {
        TableName tableNameObj = TableName.valueOf(namespace, tableName);
        Table table = conn.getTable(tableNameObj);
        Delete delete = new Delete(Bytes.toBytes(rowkey));
        table.delete(delete);
        table.close();
    }

    //向hbase表中put一行数据
    public static void putRow(Connection conn, String namespace, String tableName, String rowkey, String family, String[] columns, String[] values) throws IOException {
        TableName tableNameObj = TableName.valueOf(namespace, tableName);
        Table table = conn.getTable(tableNameObj);
        Put put = new Put(Bytes.toBytes(rowkey));
        for (int i = 0; i < columns.length; i++) {
            String column = columns[i];
            String value = values[i];
            if (value != null) {
                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
            }
        }
        table.put(put);
        table.close();
    }

    //获取从hbase字典表中读取数据并创建动态表建表语句
    public static String getBaseDicLookUpDDL() {
        return "CREATE TABLE base_dic (\n" +
            " dic_code string,\n" +
            " info ROW<dic_name string>,\n" +
            " PRIMARY KEY (dic_code) NOT ENFORCED\n" +
            ") " + getHbaseDDL("gmall0222_realtime:dim_base_dic");
    }

    //获取hbase连接器连接属性
    public static String getHbaseDDL(String tableName) {
        return "WITH (\n" +
            " 'connector' = 'hbase-2.2',\n" +
            " 'table-name' = '" + tableName + "',\n" +
            " 'zookeeper.quorum' = 'hadoop102,hadoop103,hadoop104:2181',\n" +
            " 'lookup.async' = 'true',\n" +
            " 'lookup.cache' = 'PARTIAL',\n" +
            " 'lookup.partial-cache.max-rows' = '500',\n" +
            " 'lookup.partial-cache.expire-after-write' = '1 hour',\n" +
            " 'lookup.partial-cache.expire-after-access' = '1 hour'\n" +
            ")";
    }
    //根据rowkey获取维度数据
    public static JSONObject getDimInfoFromHbase(Connection conn, String namespace, String tableName,String rowkey){
        TableName tableNameObj = TableName.valueOf(namespace,tableName);
        try (Table table = conn.getTable(tableNameObj)){
            Get get = new Get(Bytes.toBytes(rowkey));
            Result result = table.get(get);
            List<Cell> cells = result.listCells();
            if(cells != null && cells.size() > 0){
                JSONObject jsonObj = new JSONObject();
                for (Cell cell : cells) {
                    String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String columnValue = Bytes.toString(CellUtil.cloneValue(cell));
                    jsonObj.put(columnName,columnValue);
                }
                return jsonObj;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    //获取支持异步操作的连接
    public static AsyncConnection getAsyncConnection() {
        Configuration conf = new Configuration();
        // 配置 Zookeeper
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        try {
            AsyncConnection asyncConnection = ConnectionFactory.createAsyncConnection(conf).get();
            return asyncConnection;
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }

    //关闭异步连接
    public static void closeAsyncConnection(AsyncConnection conn) {
        if (conn != null && !conn.isClosed()) {
            try {
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 通过异步的方式到Hbase维度表中查询rowKey对应的一行数据
     * @param asyncConnection  支持异步操作的连接对象
     * @param nameSpace        表空间
     * @param tableName        维度表名
     * @param rowKey           rowkey
     * @return 返回当前rowkey对应的一行数据
     */
    public static JSONObject getDimInfoFromHbaseByAsync(AsyncConnection asyncConnection,
                                                        String nameSpace,
                                                        String tableName,
                                                        String rowKey){
        TableName tableNameObj = TableName.valueOf(nameSpace, tableName);
        AsyncTable<AdvancedScanResultConsumer> asyncTable = asyncConnection.getTable(tableNameObj);
        Get get = new Get(Bytes.toBytes(rowKey));
        try {
            Result result = asyncTable.get(get).get();
            List<Cell> cells = result.listCells();
            if(cells != null && cells.size() > 0){
                JSONObject jsonObj = new JSONObject();
                for (Cell cell : cells) {
                    String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String columnValue = Bytes.toString(CellUtil.cloneValue(cell));
                    jsonObj.put(columnName,columnValue);
                }
                return jsonObj;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    public static void main(String[] args) {
        AsyncConnection conn = getAsyncConnection();
        System.out.println(getDimInfoFromHbaseByAsync(conn, "gmall0222_realtime", "dim_base_trademark", "1"));
        closeAsyncConnection(conn);
    }
}
